home *** CD-ROM | disk | FTP | other *** search
/ Usenet 1993 July / InfoMagic USENET CD-ROM July 1993.ISO / sources / unix / volume22 / queuer / part03 < prev    next >
Encoding:
Internet Message Format  |  1990-06-07  |  33.4 KB

  1. Subject:  v22i009:  Multi-system program queueing package, Part03/03
  2. Newsgroups: comp.sources.unix
  3. Approved: rsalz@uunet.UU.NET
  4. X-Checksum-Snefru: 65243a6f 3368d383 5c915f62 48133fd4
  5.  
  6. Submitted-by: Scott Bradner <sob@harvisr.harvard.edu>
  7. Posting-number: Volume 22, Issue 9
  8. Archive-name: queuer/part03
  9.  
  10. #! /bin/sh
  11. # This is a shell archive.  Remove anything before this line, then unpack
  12. # it by saving it into a file and typing "sh file".  To overwrite existing
  13. # files, type "sh file -c".  You can also feed this as standard input via
  14. # unshar, or by typing "sh <file", e.g..  If this archive is complete, you
  15. # will see the following message at the end:
  16. #        "End of archive 3 (of 3)."
  17. # Contents:  enqueue.c qmaster.c
  18. # Wrapped by rsalz@coconut.bbn.com on Tue May  1 17:18:29 1990
  19. PATH=/bin:/usr/bin:/usr/ucb ; export PATH
  20. if test -f 'enqueue.c' -a "${1}" != "-c" ; then 
  21.   echo shar: Will not clobber existing file \"'enqueue.c'\"
  22. else
  23. echo shar: Extracting \"'enqueue.c'\" \(11727 characters\)
  24. sed "s/^X//" >'enqueue.c' <<'END_OF_FILE'
  25. X/* Copyright 1990  The President and Fellows of Harvard University
  26. X
  27. XPermission to use, copy, modify, and distribute this program for any
  28. Xpurpose and without fee is hereby granted, provided that this
  29. Xcopyright and permission notice appear on all copies and supporting
  30. Xdocumentation, the name of Harvard University not be used in advertising
  31. Xor publicity pertaining to distribution of the program, or to results
  32. Xderived from its use, without specific prior written permission, and notice
  33. Xbe given in supporting documentation that copying and distribution is by
  34. Xpermission of Harvard University.  Harvard University makes no
  35. Xrepresentations about the suitability of this software for any purpose.
  36. XIt is provided "as is" without express or implied warranty.    */
  37. X
  38. X
  39. X/* enqueue.c - Dan Lanciani '85 */
  40. X
  41. X#include <sys/param.h>
  42. X#include <sys/stat.h>
  43. X#include <sys/socket.h>
  44. X#include <netinet/in.h>
  45. X#include <sys/times.h>
  46. X#include <signal.h>
  47. X#include <netdb.h>
  48. X#include <pwd.h>
  49. X#include <grp.h>
  50. X#include <errno.h>
  51. X#include <stdio.h>
  52. X
  53. X#include "queue.h"
  54. X
  55. Xextern int errno;
  56. Xextern char **environ;
  57. Xint queuegroup, flat, pid, nofiles, nifiles, byebye();
  58. Xtime_t starttime, time();
  59. Xchar **ofiles, **ifiles, **lfiles;
  60. Xlong unid;
  61. X
  62. Xenqueue(s, host, sin, local, recover)
  63. Xchar *host;
  64. Xstruct sockaddr_in *sin;
  65. X{
  66. X    int se = -1, sargc, i, uid, gid, ng, nenviron;
  67. X    register long l;
  68. X    gid_t groups[NGROUPS];
  69. X    char **sargv, buf[BUFSIZ], *user, *group, *grl[XGROUPS];
  70. X    char *cwd, *xcwd, **envp, name1[BUFSIZ], name2[BUFSIZ];
  71. X    register struct passwd *pw;
  72. X    register struct group *gr;
  73. X    struct stat statb;
  74. X    struct hostent *hp;
  75. X    struct servent *sp;
  76. X    struct sockaddr_in sin2;
  77. X    register FILE *n, *m, *ck;
  78. X    FILE *popen();
  79. X
  80. X    if(gr = getgrnam(QUEUEGROUP))
  81. X        queuegroup = gr->gr_gid;
  82. X    else
  83. X        queuegroup = NOBODY;
  84. X    getstr(s, buf);
  85. X    if(recover) {
  86. X        host = newstring(buf);
  87. X        gethostname(buf, sizeof(buf));
  88. X        local = !strcmp(host, buf);
  89. X    }
  90. X    else {
  91. X        i = (u_short)atoi(buf);
  92. X        if(i) {
  93. X            if(i >= IPPORT_RESERVED)
  94. X                exit(1);
  95. X            ng = IPPORT_RESERVED - 1;
  96. X            if((se = rresvport(&ng)) < 0)
  97. X                exit(1);
  98. X            sin->sin_port = htons((u_short)i);
  99. X            if(connect(se, sin, sizeof(struct sockaddr_in)))
  100. X                exit(1);
  101. X        }
  102. X    }
  103. X    flat = local || infile(FLATFILE, host);
  104. X    sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  105. X    if(!(ck = fopen(buf, "w")))
  106. X        exit(1);
  107. X    xfputs(host, ck);
  108. X    getstr(s, buf);
  109. X    unid = atol(buf);
  110. X    sprintf(buf, "%s/%ld", SPOOLDIR, unid);
  111. X    if(unid)
  112. X        unlink(buf);
  113. X    if(recover)
  114. X        unid = 0;
  115. X    fprintf(ck, "%ld", unid);
  116. X    putc('\0', ck);
  117. X    if(unid) {
  118. X        if(!(n = fopen(buf, "w")))
  119. X            exit(1);
  120. X        fprintf(n, "%d", getpid());
  121. X        putc('\0', n);
  122. X        fclose(n);
  123. X    }
  124. X    getstr(s, buf);
  125. X    xfputs(buf, ck);
  126. X    sargc = atoi(buf);
  127. X    sargv = (char **)malloc((sargc+1) * sizeof(char *));
  128. X    for(i = 0; i < sargc; i++) {
  129. X        sargv[i] = newstring(getstr(s, buf));
  130. X        xfputs(buf, ck);
  131. X    }
  132. X    sargv[i] = NULL;
  133. X    user = newstring(getstr(s, buf));
  134. X    xfputs(buf, ck);
  135. X    if(flat) {
  136. X        if(!(pw = getpwnam(user)))
  137. X            exit(1);
  138. X        uid = pw->pw_uid;
  139. X    }
  140. X    else
  141. X        uid = NOBODY;
  142. X    group = newstring(getstr(s, buf));
  143. X    xfputs(buf, ck);
  144. X    if(flat) {
  145. X        if(!(gr = getgrnam(group)))
  146. X            exit(1);
  147. X        gid = gr->gr_gid;
  148. X    }
  149. X    else
  150. X        gid = NOBODY;
  151. X    getstr(s, buf);
  152. X    xfputs(buf, ck);
  153. X    if((ng = atoi(buf)) > XGROUPS)
  154. X        exit(1);
  155. X    for(i = 0; i < ng; i++) {
  156. X        grl[i] = newstring(getstr(s, buf));
  157. X        xfputs(buf, ck);
  158. X        if(flat) {
  159. X            if(!(gr = getgrnam(grl[i])))
  160. X                exit(1);
  161. X            groups[i] = gr->gr_gid;
  162. X        }
  163. X        else
  164. X            groups[i] = NOGROUP;
  165. X    }
  166. X    xcwd = cwd = newstring(getstr(s, buf));
  167. X    xfputs(buf, ck);
  168. X    getstr(s, buf);
  169. X    nenviron = atoi(buf);
  170. X    xfputs(buf, ck);
  171. X    envp = (char **)malloc((nenviron+1) * sizeof(char *));
  172. X    for(i = 0; i < nenviron; i++) {
  173. X        envp[i] = newstring(getstr(s, buf));
  174. X        xfputs(buf, ck);
  175. X    }
  176. X    envp[i] = NULL;
  177. X    environ = envp;
  178. X    if(readconf(sargv[0]))
  179. X        exit(1);
  180. X    if(recover) {
  181. X        if(mode == QM_INTERACTIVE || !restart) {
  182. X            fclose(ck);
  183. X            sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  184. X            unlink(buf);
  185. X            sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
  186. X            if(!access(buf, 0)) {
  187. X                if(!vfork()) {
  188. X                    execl("/bin/rm", "rm", "-rf", buf, 0);
  189. X                    exit(1);
  190. X                }
  191. X                wait(0);
  192. X            }
  193. X            exit(0);
  194. X        }
  195. X        mode = QM_BATCH;
  196. X    }
  197. X
  198. X    if(!local) {
  199. X        if(!recover) {
  200. X            sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
  201. X            if(!access(buf, 0)) {
  202. X                if(!vfork()) {
  203. X                    execl("/bin/rm", "rm", "-rf", buf, 0);
  204. X                    exit(1);
  205. X                }
  206. X                wait(0);
  207. X            }
  208. X            mkdir(buf, 0700);
  209. X        }
  210. X        chown(buf, uid, gid);
  211. X        xcwd = newstring(buf);
  212. X    }
  213. X    if(priv)
  214. X        setregid(gid, queuegroup);
  215. X    else
  216. X        setgid(gid);
  217. X    setgroups(ng, groups);
  218. X    setreuid(uid, 0);
  219. X    chdir(xcwd);
  220. X    nofiles = nifiles = 0;
  221. X    while(1) {
  222. X        getstr(s, buf);
  223. X        xfputs(buf, ck);
  224. X        if(!strcmp(buf, "done"))
  225. X            break;
  226. X        if(!strcmp(buf, "copyout")) {
  227. X            getstr(s, buf);
  228. X            xfputs(buf, ck);
  229. X            nofiles = atoi(buf);
  230. X            ofiles = (char **)malloc(nofiles * sizeof(char *));
  231. X            for(i = 0; i < nofiles; i++) {
  232. X                getstr(s, buf);
  233. X                xfputs(buf, ck);
  234. X                ofiles[i] = newstring(buf);
  235. X            }
  236. X            continue;
  237. X        }
  238. X        if(!strcmp(buf, "efs")) {
  239. X            getstr(s, buf);
  240. X            xfputs(buf, ck);
  241. X            nifiles = atoi(buf);
  242. X            ifiles = (char **)malloc(nifiles * sizeof(char *));
  243. X            lfiles = (char **)malloc(nifiles * sizeof(char *));
  244. X            for(i = 0; i < nifiles; i++) {
  245. X                getstr(s, buf);
  246. X                xfputs(buf, ck);
  247. X                ifiles[i] = newstring(buf);
  248. X#ifdef    SANEEFS
  249. X                if(ifiles[i][0] == '/') {
  250. X                    sprintf(buf,"/r/%s%s",host,ifiles[i]);
  251. X                    lfiles[i] = newstring(buf);
  252. X                }
  253. X                else
  254. X                    lfiles[i] = ifiles[i];
  255. X#else
  256. X                getstr(s, buf);
  257. X                xfputs(buf, ck);
  258. X                lfiles[i] = newstring(buf);
  259. X#endif
  260. X            }
  261. X            if(xcwd != cwd)
  262. X                free(xcwd);
  263. X            if(strncmp(cwd, "/r/", 3)) {
  264. X                sprintf(buf, "/r/%s%s", host, cwd);
  265. X                xcwd = newstring(buf);
  266. X            }
  267. X            else
  268. X                xcwd = cwd;
  269. X            chdir(xcwd);
  270. X            continue;
  271. X        }
  272. X        if(!strcmp(buf, "copyin")) {
  273. X            getstr(s, buf);
  274. X            xfputs(buf, ck);
  275. X            nifiles = atoi(buf);
  276. X            ifiles = (char **)malloc(nifiles * sizeof(char *));
  277. X            lfiles = (char **)malloc(nifiles * sizeof(char *));
  278. X            for(i = 0; i < nifiles; i++) {
  279. X                getstr(s, buf);
  280. X                xfputs(buf, ck);
  281. X                ifiles[i] = newstring(buf);
  282. X                if(rindex(ifiles[i], '/'))
  283. X                    lfiles[i] = rindex(ifiles[i], '/') + 1;
  284. X                else
  285. X                    lfiles[i] = ifiles[i];
  286. X                if(!recover) {
  287. X                    if(!(n = fopen(lfiles[i], "w")))
  288. X                        exit(1);
  289. X                    chown(lfiles[i], uid, gid);
  290. X                    getstr(s, buf);
  291. X                    l = atol(buf);
  292. X                    if(l < 0) {
  293. X                        unlink(lfiles[i]);
  294. X                        l = 0;
  295. X                    }
  296. X                    while(l) {
  297. X                        ng = sizeof(buf);
  298. X                        if(ng > l)
  299. X                            ng = l;
  300. X                        if((ng = read(s, buf, ng)) <= 0)
  301. X                            exit(1);
  302. X                        l -= ng;
  303. X                        fwrite(buf, ng, 1, n);
  304. X                    }
  305. X                    fclose(n);
  306. X                }
  307. X            }
  308. X        continue;
  309. X        }
  310. X    }
  311. X    fclose(ck);
  312. X    for(i = 1; i < sargc; i++)
  313. X        for(ng = 0; ng < nifiles; ng++)
  314. X            if(!strcmp(sargv[i], ifiles[ng]))
  315. X                sargv[i] = lfiles[ng];
  316. X
  317. X    if(mode == QM_BATCH) {
  318. X        close(s);
  319. X        if(se >= 0)
  320. X            close(se);
  321. X        sprintf(buf, "%s/%d.batch", SPOOLDIR, getpid());
  322. X        s = creat(buf, 0600);
  323. X        chown(buf, uid, gid);
  324. X        sprintf(buf, "%s/%d.ebatch", SPOOLDIR, getpid());
  325. X        se = creat(buf, 0600);
  326. X        chown(buf, uid, gid);
  327. X    }
  328. X    for(i = 0; i < 3; i++)
  329. X        dup2(s, i);
  330. X    if(se >= 0)
  331. X        dup2(se, 2);
  332. X    close(s);
  333. X    close(se);
  334. X    if(mode != QM_INTERACTIVE) {
  335. X        close(0);
  336. X        open("/dev/null", 2);
  337. X    }
  338. X
  339. X    waitrun(queue);
  340. X    if(minload)
  341. X        while(getload() > minload)
  342. X            sleep(60);
  343. X    sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  344. X    chmod(buf, 0755);
  345. X    pid = 0;
  346. X    starttime = time(0);
  347. X    signal(SIGTERM, byebye);
  348. X    pid = cspawn(prog, sargv);
  349. X    pcontrol();
  350. X    while((i = wait(0)) != pid)
  351. X        if(i < 0 && errno != EINTR)
  352. X            break;
  353. X    signal(SIGALRM, SIG_IGN);
  354. X    alarm(0);
  355. X    signal(SIGTERM, SIG_IGN);
  356. X    killpg(pid, SIGHUP);
  357. X    killpg(pid, SIGCONT);
  358. X    sleep(2);
  359. X    killpg(pid, 9);
  360. X    pid = getpid();
  361. X    if(!access(QACCT, 0) && (n = fopen(QACCT, "a"))) {
  362. X        struct tms tms;
  363. X        times(&tms);
  364. X        fprintf(n, "%ld\t%ld\t%s\t%s\t%s\t%s\n",
  365. X            tms.tms_utime, tms.tms_stime,
  366. X            sargv[0], user, group, host);
  367. X        fclose(n);
  368. X    }
  369. X    if(!local) {
  370. X        sprintf(buf, "%s/%d.dir", SPOOLDIR, pid);
  371. X        if(!access(buf, 0)) {
  372. X            if(!vfork()) {
  373. X                execl("/bin/rm", "rm", "-rf", buf, 0);
  374. X                exit(1);
  375. X            }
  376. X            wait(0);
  377. X        }
  378. X    }
  379. X    if(unid) {
  380. X        sprintf(buf, "%s/%ld", SPOOLDIR, unid);
  381. X        unlink(buf);
  382. X    }
  383. X    sprintf(name1, "%s/%s", SPOOLDIR, queue);
  384. X    lock(name1);
  385. X    n = fopen(name1, "r");
  386. X    strcpy(name2, name1);
  387. X    strcat(name2, ".tmp");
  388. X    m = fopen(name2, "w");
  389. X    i = 0;
  390. X    while(fgets(buf, sizeof(buf), n))
  391. X        if(atoi(buf) != pid) {
  392. X            if(i++ < maxrun)
  393. X                kill(atoi(buf), SIGALRM);
  394. X            fputs(buf, m);
  395. X        }
  396. X    fclose(m);
  397. X    fclose(n);
  398. X    unlink(name1);
  399. X    link(name2, name1);
  400. X    unlink(name2);
  401. X    unlock(name1);
  402. X    for(i = 0; i < 3; i++)
  403. X        close(i);
  404. X    open("/dev/null", 2);
  405. X    dup(0);
  406. X    dup(0);
  407. X    if(*qm && (sp = getservbyname("qmaster", "udp")) &&
  408. X        (hp = gethostbyname(qm)) &&
  409. X        (i = socket(AF_INET, SOCK_DGRAM, 0)) >= 0) {
  410. X        *buf = 0;
  411. X        if(unid)
  412. X            sprintf(buf + 1, "%ld", unid);
  413. X        else
  414. X            sprintf(buf + 1, "%d", pid);
  415. X        sin2.sin_family = hp->h_addrtype;
  416. X        sin2.sin_port = sp->s_port;
  417. X        bcopy(hp->h_addr, &sin2.sin_addr, hp->h_length);
  418. X        sendto(i, buf, 2 + strlen(buf + 1), 0, &sin2, sizeof(sin2));
  419. X        close(i);
  420. X    }
  421. X    if(mode == QM_BATCH) {
  422. X        int sentsome = 0;
  423. X        char *p;
  424. X
  425. X        sprintf(name1, "%s@%s", user, host);
  426. X        name2[0] = '\0';
  427. X        for(i = 0; i < sargc - 1; i++) {
  428. X            strcat(name2, sargv[i]);
  429. X            strcat(name2, " ");
  430. X        }
  431. X        strcat(name2, sargv[sargc - 1]);
  432. X        sprintf(buf, "%s/%d.batch", SPOOLDIR, pid);
  433. X        close(0);
  434. X        open(buf, 0);
  435. X        unlink(buf);
  436. X        if(!fstat(0, &statb) && statb.st_size) {
  437. X            sprintf(buf, "batch job %d output (%s)", pid, name2);
  438. X            if(!vfork()) {
  439. X                setuid(uid);
  440. X                execl("/usr/ucb/Mail", "Mail", "-s",
  441. X                    buf, name1, 0);
  442. X                exit(1);
  443. X            }
  444. X            wait(0);
  445. X            sentsome++;
  446. X        }
  447. X        sprintf(buf, "%s/%d.ebatch", SPOOLDIR, pid);
  448. X        close(0);
  449. X        open(buf, 0);
  450. X        unlink(buf);
  451. X        if(!fstat(0, &statb) && statb.st_size) {
  452. X            sprintf(buf, "batch job %d errors (%s)", pid, name2);
  453. X            if(!vfork()) {
  454. X                setuid(uid);
  455. X                execl("/usr/ucb/Mail", "Mail", "-s",
  456. X                    buf, name1, 0);
  457. X                exit(1);
  458. X            }
  459. X            wait(0);
  460. X            sentsome++;
  461. X        }
  462. X        close(0);
  463. X        dup(1);
  464. X        sprintf(buf, "%s/%d", SPOOLDIR, pid);
  465. X        unlink(buf);
  466. X        setuid(uid);
  467. X        if(!(p = getenv("QNOTIFY")))
  468. X            p = "mail";
  469. X        if(!strcmp(p, "send") || !strcmp(p,"saml")||!strcmp(p,"soml")) {
  470. X            sprintf(buf, "exec /usr/lib/sendmail -S -eq %s", name1);
  471. X            i = 1;
  472. X            if(n = popen(buf, "w")) {
  473. X            fprintf(n, "Your batch job %d is finished.\n", pid);
  474. X            fprintf(n, "\"%s\"\n", name2);
  475. X            i = pclose(n);
  476. X            }
  477. X        }
  478. X        if(!sentsome &&
  479. X(!strcmp(p, "mail") || !strcmp(p, "saml") || (!strcmp(p, "soml") && i))) {
  480. X            sprintf(buf, "exec /usr/ucb/Mail -s 'batch job' %s",
  481. X                name1);
  482. X            if(n = popen(buf, "w")) {
  483. X            fprintf(n, "Your batch job %d is finished.\n", pid);
  484. X            fprintf(n, "\"%s\"\n", name2);
  485. X            pclose(n);
  486. X            }
  487. X        }
  488. X    }
  489. X    else {
  490. X        sprintf(buf, "%s/%d", SPOOLDIR, pid);
  491. X        unlink(buf);
  492. X    }
  493. X}
  494. X
  495. Xbyebye()
  496. X{    
  497. X    if(pid) {
  498. X        killpg(pid, SIGHUP);
  499. X        killpg(pid, SIGCONT);
  500. X        kill(pid, SIGCONT);
  501. X        killpg(pid, 9);
  502. X        kill(pid, 9);
  503. X    }
  504. X    signal(SIGTERM, byebye);
  505. X}
  506. X
  507. Xcatch()
  508. X{
  509. X    signal(SIGALRM, catch);
  510. X}
  511. X
  512. Xint running = 1;
  513. X
  514. Xpcontrol()
  515. X{
  516. X    int load;
  517. X
  518. X    if(maxtime && time(0) - starttime > maxtime)
  519. X        byebye();
  520. X    if(minload != maxload) {
  521. X        load = getload();
  522. X        if(running) {
  523. X            if(load >= maxload) {
  524. X                running = 0;
  525. X                killpg(pid, SIGSTOP);
  526. X            }
  527. X        }
  528. X        else {
  529. X            if(load <= minload) {
  530. X                running = 1;
  531. X                killpg(pid, SIGCONT);
  532. X            }
  533. X        }
  534. X    }
  535. X    signal(SIGALRM, pcontrol);
  536. X    alarm(60);
  537. X}
  538. X
  539. Xwaitrun(q)
  540. Xchar *q;
  541. X{
  542. X    int i, pid;
  543. X    char buf[BUFSIZ], p[BUFSIZ];
  544. X    FILE *n;
  545. X
  546. X    signal(SIGALRM, catch);
  547. X    sprintf(p, "%s/%s", SPOOLDIR, q);
  548. X    pid = getpid();
  549. X    lock(p);
  550. X    if(!(n = fopen(p, "a"))) {
  551. X        unlock(p);
  552. X        exit(1);
  553. X    }
  554. X    fprintf(n, "%d\n", pid);
  555. X    fclose(n);
  556. X    unlock(p);
  557. X    while(1) {
  558. X        lock(p);
  559. X        if(!(n = fopen(p, "r"))) {
  560. X            unlock(p);
  561. X            exit(1);
  562. X        }
  563. X        for(i = 0; i < maxrun; i++) {
  564. X            fgets(buf, sizeof(buf), n);
  565. X            buf[strlen(buf)-1] = '\0';
  566. X            if(atoi(buf) == pid) {
  567. X                fclose(n);
  568. X                unlock(p);
  569. X                return;
  570. X            }
  571. X        }
  572. X        fclose(n);
  573. X        unlock(p);
  574. X        alarm(300);
  575. X        pause();
  576. X        alarm(0);
  577. X    }
  578. X}
  579. END_OF_FILE
  580. if test 11727 -ne `wc -c <'enqueue.c'`; then
  581.     echo shar: \"'enqueue.c'\" unpacked with wrong size!
  582. fi
  583. # end of 'enqueue.c'
  584. fi
  585. if test -f 'qmaster.c' -a "${1}" != "-c" ; then 
  586.   echo shar: Will not clobber existing file \"'qmaster.c'\"
  587. else
  588. echo shar: Extracting \"'qmaster.c'\" \(19051 characters\)
  589. sed "s/^X//" >'qmaster.c' <<'END_OF_FILE'
  590. X/* Copyright 1990  The President and Fellows of Harvard University
  591. X
  592. XPermission to use, copy, modify, and distribute this program for any
  593. Xpurpose and without fee is hereby granted, provided that this
  594. Xcopyright and permission notice appear on all copies and supporting
  595. Xdocumentation, the name of Harvard University not be used in advertising
  596. Xor publicity pertaining to distribution of the program, or to results
  597. Xderived from its use, without specific prior written permission, and notice
  598. Xbe given in supporting documentation that copying and distribution is by
  599. Xpermission of Harvard University.  Harvard University makes no
  600. Xrepresentations about the suitability of this software for any purpose.
  601. XIt is provided "as is" without express or implied warranty.    */
  602. X
  603. X
  604. X/* qmaster.c - Dan Lanciani '89 */
  605. X
  606. X/*
  607. Xrefresh hosts less
  608. Xpick up strays
  609. X*/
  610. X
  611. X#include <stdio.h>
  612. X#include <sys/types.h>
  613. X#include <sys/socket.h>
  614. X#include <sys/wait.h>
  615. X#include <sys/time.h>
  616. X#include <netinet/in.h>
  617. X#include <netdb.h>
  618. X#include <errno.h>
  619. X#include <signal.h>
  620. X#include <sgtty.h>
  621. X#include <setjmp.h>
  622. X
  623. X#include "queue.h"
  624. X
  625. X#define GARBAGE (1*60)
  626. X#define HOLDDOWN (2*60)
  627. X#define HOSTPOLLTIME (5*60)
  628. X#define DEADMAN (1*60)
  629. X
  630. Xstruct hostinfo {
  631. X    struct hostinfo *hi_next;
  632. X    char *hi_name;
  633. X    struct sockaddr_in hi_addr;
  634. X    int hi_equiv;
  635. X    int hi_load;
  636. X    time_t hi_timestamp;
  637. X    int hi_dead;
  638. X    int hi_queued;
  639. X} *hostinfo;
  640. X
  641. Xstruct userinfo {
  642. X    struct userinfo *ui_next;
  643. X    long ui_unid;
  644. X    struct sockaddr_in ui_addr;
  645. X    char *ui_name;
  646. X    struct proginfo *ui_prog;
  647. X    time_t ui_timestamp;
  648. X    int ui_mark;
  649. X};
  650. X
  651. Xstruct queueinfo {
  652. X    struct queueinfo *qi_next;
  653. X    char *qi_name;
  654. X    int qi_maxrun;
  655. X    int qi_minload;
  656. X    int qi_maxperu;
  657. X    int qi_hcnt;
  658. X    struct hostinfo *qi_hosts[MAXHCNT];
  659. X    struct userinfo *qi_heads[MAXHCNT];
  660. X    struct userinfo *qi_head;
  661. X} *queueinfo;
  662. X
  663. Xstruct proginfo {
  664. X    struct proginfo *pi_next;
  665. X    char *pi_name;
  666. X    struct queueinfo *pi_queue;
  667. X} *proginfo;
  668. X
  669. Xstruct sockaddr_in sin = { AF_INET };
  670. Xtime_t time();
  671. Xextern int errno;
  672. Xint reapchild();
  673. Xlong atol();
  674. X
  675. Xlong
  676. Xnewunid()
  677. X{
  678. X    static int fd = -1;
  679. X    long unid;
  680. X    char buf[100];
  681. X
  682. X    if(fd < 0) {
  683. X        if((fd = open(UNID, 2)) < 0) {
  684. X            if((fd = creat(UNID, 0644)) < 0) {
  685. X                perror("creat");
  686. X                exit(1);
  687. X            }
  688. X            sprintf(buf, "%ld", FIRSTUNID);
  689. X            write(fd, buf, strlen(buf) + 1);
  690. X            close(fd);
  691. X            if((fd = open(UNID, 2)) < 0) {
  692. X                perror("open");
  693. X                exit(1);
  694. X            }
  695. X        }
  696. X    }
  697. X    lseek(fd, 0L, 0);
  698. X    read(fd, buf, sizeof(buf));
  699. X    unid = atol(buf) + 1;
  700. X    if(!isunid(unid))
  701. X        unid = FIRSTUNID;
  702. X    sprintf(buf, "%ld", unid);
  703. X    lseek(fd, 0L, 0);
  704. X    write(fd, buf, strlen(buf) + 1);
  705. X    return(unid);
  706. X}
  707. X
  708. X#define refresh(hi) \
  709. X    if(time((time_t)0) - (hi)->hi_timestamp > HOSTPOLLTIME) { \
  710. X        (hi)->hi_load = getrload((hi)->hi_name); \
  711. X        (hi)->hi_timestamp = time((time_t)0); \
  712. X    } \
  713. X    else
  714. X
  715. Xstruct hostinfo *
  716. Xhibyname(name)
  717. Xregister char *name;
  718. X{
  719. X    register struct hostinfo *hi;
  720. X    register struct hostent *hp;
  721. X
  722. X    for(hi = hostinfo; hi; hi = hi->hi_next)
  723. X        if(!strcmp(hi->hi_name, name)) {
  724. X            refresh(hi);
  725. X            return(hi);
  726. X        }
  727. X    if(!(hp = gethostbyname(name)))
  728. X        return(0);
  729. X    hi = (struct hostinfo *)malloc(sizeof(struct hostinfo));
  730. X    hi->hi_name = newstring(name);
  731. X    if(infile(EQUIVFILE, hi->hi_name))
  732. X        hi->hi_equiv = 1;
  733. X    else
  734. X        hi->hi_equiv = 0;
  735. X    hi->hi_addr.sin_family = hp->h_addrtype;
  736. X    bcopy(hp->h_addr, &hi->hi_addr.sin_addr, hp->h_length);
  737. X    hi->hi_load = getrload(name);
  738. X    hi->hi_timestamp = time((time_t)0);
  739. X    hi->hi_dead = 0;
  740. X    hi->hi_queued = 0;
  741. X    hi->hi_next = hostinfo;
  742. X    hostinfo = hi;
  743. X    return(hi);
  744. X}
  745. X
  746. Xstruct hostinfo *
  747. Xhibyaddr(addr)
  748. Xregister struct sockaddr_in *addr;
  749. X{
  750. X    register struct hostinfo *hi;
  751. X    register struct hostent *hp;
  752. X
  753. X    for(hi = hostinfo; hi; hi = hi->hi_next)
  754. X        if(addr->sin_addr.s_addr == hi->hi_addr.sin_addr.s_addr) {
  755. X            refresh(hi);
  756. X            return(hi);
  757. X        }
  758. X    if(!(hp=gethostbyaddr(&addr->sin_addr, sizeof(struct in_addr),AF_INET)))
  759. X        return(0);
  760. X    hi = (struct hostinfo *)malloc(sizeof(struct hostinfo));
  761. X    hi->hi_name = newstring(hp->h_name);
  762. X    if(index(hi->hi_name, '.'))
  763. X        *index(hi->hi_name, '.') = '\0';
  764. X    if(infile(EQUIVFILE, hi->hi_name))
  765. X        hi->hi_equiv = 1;
  766. X    else
  767. X        hi->hi_equiv = 0;
  768. X    hi->hi_addr.sin_family = hp->h_addrtype;
  769. X    bcopy(hp->h_addr, &hi->hi_addr.sin_addr, hp->h_length);
  770. X    hi->hi_load = getrload(hi->hi_name);
  771. X    hi->hi_timestamp = time((time_t)0);
  772. X    hi->hi_dead = 0;
  773. X    hi->hi_queued = 0;
  774. X    hi->hi_next = hostinfo;
  775. X    hostinfo = hi;
  776. X    return(hi);
  777. X}
  778. X
  779. Xstruct userinfo *
  780. Xuibyunid(unid, unlnk)
  781. Xregister long unid;
  782. X{
  783. X    register struct queueinfo *qi;
  784. X    register struct userinfo *ui, *pu;
  785. X    register int i;
  786. X
  787. X    for(qi = queueinfo; qi; qi = qi->qi_next) {
  788. X        for(i = 0; i < qi->qi_hcnt; i++)
  789. X            for(pu = 0, ui =qi->qi_heads[i];ui;pu=ui,ui=ui->ui_next)
  790. X                if(unid == ui->ui_unid) {
  791. X                    if(unlnk)
  792. X                    if(pu)
  793. X                        pu->ui_next = ui->ui_next;
  794. X                    else
  795. X                        qi->qi_heads[i] = ui->ui_next;
  796. X                    return(ui);
  797. X                }
  798. X        for(pu = 0, ui = qi->qi_head; ui; pu = ui, ui = ui->ui_next)
  799. X            if(unid == ui->ui_unid) {
  800. X                if(unlnk)
  801. X                    if(pu)
  802. X                        pu->ui_next = ui->ui_next;
  803. X                    else
  804. X                        qi->qi_head = ui->ui_next;
  805. X                return(ui);
  806. X            }
  807. X    }
  808. X    return(0);
  809. X}
  810. X
  811. Xstruct queueinfo *
  812. Xqibyname(name)
  813. Xregister char *name;
  814. X{
  815. X    register struct queueinfo *qi;
  816. X
  817. X    for(qi = queueinfo; qi; qi = qi->qi_next)
  818. X        if(!strcmp(qi->qi_name, name))
  819. X            return(qi);
  820. X    return(0);
  821. X}
  822. X
  823. Xstruct proginfo *
  824. Xpibyname(name)
  825. Xregister char *name;
  826. X{
  827. X    register struct proginfo *pi;
  828. X    register struct queueinfo *qi;
  829. X    register int i;
  830. X
  831. X    for(pi = proginfo; pi; pi = pi->pi_next)
  832. X        if(!strcmp(pi->pi_name, name))
  833. X            return(pi);
  834. X    if(readconf(name))
  835. X        return(0);
  836. X    if(!(qi = qibyname(queue))) {
  837. X        qi = (struct queueinfo *)malloc(sizeof(struct queueinfo));
  838. X        qi->qi_name = newstring(queue);
  839. X        qi->qi_maxrun = maxrun;
  840. X        qi->qi_minload = minload;
  841. X        qi->qi_maxperu = maxperu;
  842. X        for(i = 0; i < hcnt; i++) {
  843. X            if(!(qi->qi_hosts[i] = hibyname(hosts[i]))) {
  844. X                i--;
  845. X                hcnt--;
  846. X                continue;
  847. X            }
  848. X            qi->qi_hosts[i]->hi_queued = 1;
  849. X            qi->qi_heads[i] = 0;
  850. X        }
  851. X        if(hcnt <= 0) {
  852. X            free(qi);
  853. X            return(0);
  854. X        }
  855. X        qi->qi_hcnt = hcnt;
  856. X        qi->qi_head = 0;
  857. X        qi->qi_next = queueinfo;
  858. X        queueinfo = qi;
  859. X    }
  860. X    pi = (struct proginfo *)malloc(sizeof(struct proginfo));
  861. X    pi->pi_name = newstring(name);
  862. X    pi->pi_queue = qi;
  863. X    pi->pi_next = proginfo;
  864. X    proginfo = pi;
  865. X    return(pi);
  866. X}
  867. X
  868. Xchar *
  869. Xgetln(n, buf)
  870. Xchar *buf;
  871. X{
  872. X    register char *p = buf;
  873. X
  874. X    do
  875. X        if(read(n, p, 1) != 1)
  876. X            return(0);
  877. X    while(*p++ != '\n');
  878. X    p[-1] = 0;
  879. X    return(buf);
  880. X}
  881. X
  882. Xchar *
  883. Xgetstr(n, buf)
  884. Xchar *buf;
  885. X{
  886. X    register char *p = buf;
  887. X
  888. X    do
  889. X        if(read(n, p, 1) != 1)
  890. X            return(0);
  891. X    while(*p++);
  892. X    return(buf);
  893. X}
  894. X
  895. Xstruct timeval timeout = { 60, 0 };
  896. Xchar myname[100];
  897. Xint s, u, on = 1;
  898. Xint strays, dropouts, upstarts;
  899. Xint deadmans, enqueues, dequeues, showqueues;
  900. Xjmp_buf jb;
  901. X
  902. Xcatch()
  903. X{
  904. X    deadmans++;
  905. X    longjmp(jb, 1);
  906. X}
  907. X
  908. Xmain(argc, argv, envp)
  909. Xchar **argv, **envp;
  910. X{
  911. X    register struct servent *sp;
  912. X    register struct queueinfo *qi;
  913. X    register struct userinfo *ui, *pu;
  914. X    register struct hostinfo *hi, *garbagehi = 0;
  915. X    register int i, j, qport;
  916. X    long probe, mask, unid;
  917. X    char buf[BUFSIZ];
  918. X    time_t garbage = 0;
  919. X
  920. X    gethostname(myname, sizeof(myname));
  921. X    if(argc > 1) {
  922. X        for(i = 1; i < argc; i++)
  923. X            if(!strcmp(argv[i], myname))
  924. X                goto iammaster;
  925. Xfprintf(stderr, "I am not master %s\n", myname);
  926. X        exit(0);
  927. X    }
  928. Xiammaster:
  929. X#ifndef DEBUG
  930. X    for(s = 0; s < 30; s++)
  931. X        close(s);
  932. X    if((s = open("/dev/tty", 2)) >= 0) {
  933. X        ioctl(s, TIOCNOTTY, 0);
  934. X        close(s);
  935. X    }
  936. X    s = open("/", 0);
  937. X    dup(s);
  938. X    dup(s);
  939. X    signal(SIGALRM, SIG_IGN);
  940. X    signal(SIGPIPE, SIG_IGN);
  941. X    if(fork())
  942. X        exit(0);
  943. X#endif
  944. X
  945. X    while(fork()) {
  946. X        wait((int *)0);
  947. X        sleep(15);
  948. X    }
  949. X    if(!(sp = getservbyname("queue", "tcp"))) {
  950. X        fprintf(stderr, "queue/tcp: Bad service?!?\n");
  951. X        exit(1);
  952. X    }
  953. X    qport = sp->s_port;
  954. X    if(!(sp = getservbyname("qmaster", "tcp"))) {
  955. X        fprintf(stderr, "qmaster/tcp: Bad service?!?\n");
  956. X        exit(1);
  957. X    }
  958. X    if((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
  959. X        perror("socket");
  960. X        exit(1);
  961. X    }
  962. X    setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  963. X    setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on));
  964. X    sin.sin_port = sp->s_port;
  965. X    if(bind(s, &sin, sizeof(sin))) {
  966. X        perror("bind");
  967. X        exit(1);
  968. X    }
  969. X    listen(s, 10);
  970. X    mask = (1L << s);
  971. X    if(!(sp = getservbyname("qmaster", "udp"))) {
  972. X        fprintf(stderr, "qmaster/udp: Bad service?!?\n");
  973. X        exit(1);
  974. X    }
  975. X    if((u = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
  976. X        perror("socket");
  977. X        exit(1);
  978. X    }
  979. X    setsockopt(u, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  980. X    sin.sin_port = sp->s_port;
  981. X    if(bind(u, &sin, sizeof(sin))) {
  982. X        perror("bind");
  983. X        exit(1);
  984. X    }
  985. X    mask |= (1L << u);
  986. X    while(1) {
  987. X        struct sockaddr_in from;
  988. X        int s0, fromlen;
  989. X        signal(SIGCHLD, reapchild);
  990. X        probe = mask;
  991. X        if(select(32, &probe, 0, 0, &timeout) < 0) {
  992. X            sleep(1);
  993. X            continue;
  994. X        }
  995. X        if(probe & (1L << s)) {
  996. X            fromlen = sizeof(from);
  997. X            if((s0 = accept(s, &from, &fromlen)) < 0) {
  998. X                if(errno = EINTR)
  999. X                    continue;
  1000. X                perror("accept");
  1001. X                sleep(1);
  1002. X                continue;
  1003. X            }
  1004. X            if(from.sin_family != AF_INET ||
  1005. X                htons((u_short)from.sin_port) >= IPPORT_RESERVED
  1006. X                || htons((u_short)from.sin_port)
  1007. X                < IPPORT_RESERVED / 2) {
  1008. X                close(s0);
  1009. X                continue;
  1010. X            }
  1011. X            dostream(s0, &from);
  1012. X        }
  1013. X        if(probe & (1L << u))
  1014. X            dodgram(u);
  1015. X        for(hi = hostinfo; hi; hi = hi->hi_next)
  1016. X            if(hi->hi_queued)
  1017. X                refresh(hi);
  1018. X        for(qi = queueinfo; qi; qi = qi->qi_next) {
  1019. X            if(!qi->qi_head)
  1020. X                continue;
  1021. X            for(i = 0; i < qi->qi_hcnt; i++)
  1022. X                for(j = i + 1; j < qi->qi_hcnt; j++)
  1023. X                    if(qi->qi_hosts[j]->hi_load <
  1024. X                        qi->qi_hosts[i]->hi_load) {
  1025. X                        ui = qi->qi_heads[i];
  1026. X                        hi = qi->qi_hosts[i];
  1027. X                        qi->qi_heads[i]=qi->qi_heads[j];
  1028. X                        qi->qi_hosts[i]=qi->qi_hosts[j];
  1029. X                        qi->qi_heads[j] = ui;
  1030. X                        qi->qi_hosts[j] = hi;
  1031. X                    }
  1032. X            for(i = 0; i < qi->qi_hcnt && qi->qi_head; i++) {
  1033. X                if(qi->qi_hosts[i]->hi_dead)
  1034. X                    continue;
  1035. X                if(qi->qi_minload &&
  1036. X                    qi->qi_hosts[i]->hi_load>qi->qi_minload)
  1037. X                    continue;
  1038. X                j = 0;
  1039. X                for(ui = qi->qi_heads[i]; ui; ui = ui->ui_next)
  1040. X                    j++;
  1041. X                while(j < qi->qi_maxrun && (ui = qi->qi_head)) {
  1042. X                    qi->qi_head = ui->ui_next;
  1043. X                    ui->ui_next = qi->qi_heads[i];
  1044. X                    qi->qi_heads[i] = ui;
  1045. X                    sprintf(buf, "\1%s",
  1046. X                        qi->qi_hosts[i]->hi_name);
  1047. X                    sendto(u, buf, strlen(buf) + 1, 0,
  1048. X                        &ui->ui_addr,
  1049. X                        sizeof(ui->ui_addr));
  1050. X                    ui->ui_timestamp = time((time_t)0);
  1051. X                    j++;
  1052. X                }
  1053. X            }
  1054. X            *buf = 0;
  1055. X            for(ui = qi->qi_head; ui; ui = ui->ui_next)
  1056. X                sendto(u, buf, 1, 0, &ui->ui_addr,
  1057. X                    sizeof(ui->ui_addr));
  1058. X        }
  1059. X        if(time((time_t)0) - garbage > GARBAGE) {
  1060. X            int rp = IPPORT_RESERVED - 1;
  1061. X            register char *p;
  1062. X            if(!garbagehi)
  1063. X                garbagehi = hostinfo;
  1064. X            if(!(hi = garbagehi) || !hi->hi_queued)
  1065. X                goto out;
  1066. X            hi->hi_addr.sin_port = qport;
  1067. X            if((i = rresvport(&rp)) < 0)
  1068. X                goto out;
  1069. X            if(setjmp(jb))
  1070. X                goto out2;
  1071. X#ifdef DEBUG
  1072. X            fprintf(stderr, "gc host %s\n", hi->hi_name);
  1073. X#endif
  1074. X            signal(SIGALRM, catch);
  1075. X            alarm(DEADMAN);
  1076. X            if(connect(i, &hi->hi_addr, sizeof(hi->hi_addr))) {
  1077. X                hi->hi_dead = 1;
  1078. X                goto out2;
  1079. X            }
  1080. X            hi->hi_dead = 0;
  1081. X            alarm(DEADMAN);
  1082. X            write(i, "\1", 2);
  1083. X            alarm(DEADMAN);
  1084. X            while(getln(i, buf)) {
  1085. X                alarm(0);
  1086. X                if(strncmp(buf, "Queue: ", 7))
  1087. X                    goto out2;
  1088. X                if(!(p = index(buf + 7, ',')))
  1089. X                    goto out2;
  1090. X                *p = 0;
  1091. X#ifdef DEBUG
  1092. X                fprintf(stderr, "queue = %s\n", buf);
  1093. X#endif
  1094. X                if(!(qi = qibyname(buf + 7))) {
  1095. X                skip:
  1096. X                    alarm(DEADMAN);
  1097. X                    while(getln(i, buf) && *buf)
  1098. X                        alarm(DEADMAN);
  1099. X                    continue;
  1100. X                }
  1101. X                for(j = 0; j < qi->qi_hcnt; j++)
  1102. X                    if(hi == qi->qi_hosts[j])
  1103. X                        break;
  1104. X                if(j >= qi->qi_hcnt)
  1105. X                    goto skip;
  1106. X                for(ui = qi->qi_heads[j]; ui; ui = ui->ui_next)
  1107. X                    ui->ui_mark = 0;
  1108. X                alarm(DEADMAN);
  1109. X                if(!getln(i, buf) || strncmp(buf, "Pid", 3))
  1110. X                    goto out2;
  1111. X#ifdef DEBUG
  1112. X                fprintf(stderr, "header = %s\n", buf);
  1113. X#endif
  1114. X                while(getln(i, buf) && *buf) {
  1115. X                    alarm(0);
  1116. X#ifdef DEBUG
  1117. X                fprintf(stderr, "entry = %s\n", buf);
  1118. X#endif
  1119. X                    unid = atol(buf);
  1120. X                    for(ui = qi->qi_heads[j]; ui;
  1121. X                        ui = ui->ui_next)
  1122. X                        if(ui->ui_unid == unid) {
  1123. X                            /* XXX check mark */
  1124. X                            ui->ui_mark = 1;
  1125. X                            break;
  1126. X                        }
  1127. X                    if(ui) {
  1128. X                        alarm(DEADMAN);
  1129. X                        continue;
  1130. X                    }
  1131. X                    if(ui = uibyunid(unid, 1)) {
  1132. X                        ui->ui_mark = 1;
  1133. X                        ui->ui_next = qi->qi_heads[j];
  1134. X                        qi->qi_heads[j] = ui;
  1135. X                        upstarts++;
  1136. X                        alarm(DEADMAN);
  1137. X                        continue;
  1138. X                    }
  1139. X                    /* XXX add it */
  1140. X                    strays++;
  1141. X                    alarm(DEADMAN);
  1142. X                }
  1143. X                alarm(0);
  1144. X            again:
  1145. X                for(pu = 0, ui = qi->qi_heads[j]; ui;
  1146. X                pu = ui, ui = ui->ui_next)
  1147. X                if(!ui->ui_mark && time((time_t)0) -
  1148. X                    ui->ui_timestamp > HOLDDOWN) {
  1149. X                    if(pu)
  1150. X                        pu->ui_next = ui->ui_next;
  1151. X                    else
  1152. X                        qi->qi_heads[j] = ui->ui_next;
  1153. X                    free(ui->ui_name);
  1154. X                    free(ui);
  1155. X                    dropouts++;
  1156. X                    goto again;
  1157. X                }
  1158. X                alarm(DEADMAN);
  1159. X            }
  1160. X        out2:
  1161. X            close(i);
  1162. X        out:
  1163. X            alarm(0);
  1164. X            signal(SIGALRM, SIG_DFL);
  1165. X            garbagehi = hi->hi_next;
  1166. X            garbage = time((time_t)0);
  1167. X        }
  1168. X    }
  1169. X}
  1170. X
  1171. Xreapchild()
  1172. X{
  1173. X    union wait status;
  1174. X
  1175. X    while(wait3(&status, WNOHANG, 0) > 0);
  1176. X}
  1177. X
  1178. Xdostream(s, sin)
  1179. Xregister int s;
  1180. Xstruct sockaddr_in *sin;
  1181. X{
  1182. X    register struct hostinfo *hi;
  1183. X
  1184. X    if(!(hi = hibyaddr(sin)))
  1185. X        goto bad;
  1186. X    if(setjmp(jb))
  1187. X        goto bad;
  1188. X    signal(SIGALRM, catch);
  1189. X    alarm(DEADMAN);
  1190. X    if(!strcmp(hi->hi_name, "localhost") || !strcmp(hi->hi_name, myname))
  1191. X        qservice(s, myname, sin, 1);
  1192. X    else if(hi->hi_equiv)
  1193. X        qservice(s, hi->hi_name, sin, 0);
  1194. Xbad:
  1195. X    alarm(0);
  1196. X    signal(SIGALRM, SIG_IGN);
  1197. X    close(s);
  1198. X}
  1199. X
  1200. Xdodgram(s)
  1201. Xregister int s;
  1202. X{
  1203. X    char buf[BUFSIZ];
  1204. X    struct sockaddr_in sin;
  1205. X    int len, fromlen = sizeof(sin);
  1206. X    register struct userinfo *ui;
  1207. X
  1208. X    if(setjmp(jb))
  1209. X        goto bad;
  1210. X    signal(SIGALRM, catch);
  1211. X    alarm(10);
  1212. X    if((len = recvfrom(s, buf, sizeof(buf), 0, &sin, sizeof(sin))) <= 0)
  1213. X        goto bad;
  1214. X    alarm(0);
  1215. X    switch(*buf&0377) {
  1216. X
  1217. X        case 0:
  1218. X            if(ui = uibyunid(atol(buf + 1), 1)) {
  1219. X                free(ui->ui_name);
  1220. X                free(ui);
  1221. X            }
  1222. X            break;
  1223. X    }
  1224. Xbad:
  1225. X    alarm(0);
  1226. X    signal(SIGALRM, SIG_IGN);
  1227. X}
  1228. X
  1229. Xqservice(s, host, sin, local)
  1230. Xregister int s;
  1231. Xregister char *host;
  1232. Xregister struct sockaddr_in *sin;
  1233. Xregister int local;
  1234. X{
  1235. X    char request;
  1236. X
  1237. X    if(read(s, &request, 1) != 1)
  1238. X        return;
  1239. X    switch(request&0377) {
  1240. X
  1241. X        case 0:
  1242. X            menqueue(s, host, sin, local);
  1243. X            enqueues++;
  1244. X            break;
  1245. X
  1246. X        case 1:
  1247. X            mshowqueue(s, host, sin, local);
  1248. X            showqueues++;
  1249. X            break;
  1250. X
  1251. X        case 2:
  1252. X            mdequeue(s, host, sin, local);
  1253. X            dequeues++;
  1254. X            break;
  1255. X    }
  1256. X}
  1257. X
  1258. Xmenqueue(s, host, sin, local)
  1259. Xregister int s;
  1260. Xregister char *host;
  1261. Xregister struct sockaddr_in *sin;
  1262. Xregister int local;
  1263. X{
  1264. X    char what[BUFSIZ], prog[BUFSIZ], user[BUFSIZ], buf[BUFSIZ];
  1265. X    register struct proginfo *pi;
  1266. X    register struct queueinfo *qi;
  1267. X    register struct userinfo *ui, *u2;
  1268. X    register int i, j;
  1269. X
  1270. X    if(!getstr(s, what) || !getstr(s, prog) || !getstr(s, user))
  1271. X        return;
  1272. X    alarm(0);
  1273. X    if(!(pi = pibyname(prog)))
  1274. X        return;
  1275. X    qi = pi->pi_queue;
  1276. X    if(qi->qi_maxperu) {
  1277. X        for(i = j = 0; i < qi->qi_hcnt; i++)
  1278. X            for(ui = qi->qi_heads[i]; ui; ui = ui->ui_next)
  1279. X                if(!strcmp(user, ui->ui_name))
  1280. X                    j++;
  1281. X        for(ui = qi->qi_head; ui; ui = ui->ui_next)
  1282. X            if(!strcmp(user, ui->ui_name))
  1283. X                j++;
  1284. X        if(j >= qi->qi_maxperu) {
  1285. X            *buf = 1;
  1286. X            sprintf(buf + 1, "You already have %d jobs queued", j);
  1287. X            alarm(DEADMAN);
  1288. X            write(s, buf, strlen(buf + 1) + 2);
  1289. X            return;
  1290. X        }
  1291. X    }
  1292. X    ui = (struct userinfo *)malloc(sizeof(struct userinfo));
  1293. X    *buf = 0;
  1294. X    sprintf(buf + 1, "%ld", ui->ui_unid = newunid());
  1295. X    alarm(DEADMAN);
  1296. X    write(s, buf, strlen(buf + 1) + 2);
  1297. X    alarm(0);
  1298. X    ui->ui_addr = *sin;
  1299. X    ui->ui_addr.sin_port = atoi(what);
  1300. X    ui->ui_name = newstring(user);
  1301. X    ui->ui_prog = pi;
  1302. X    ui->ui_next = 0;
  1303. X    if(u2 = qi->qi_head) {
  1304. X        while(u2->ui_next)
  1305. X            u2 = u2->ui_next;
  1306. X        u2->ui_next = ui;
  1307. X    }
  1308. X    else
  1309. X        qi->qi_head = ui;
  1310. X}
  1311. X
  1312. Xmshowqueue(s, host, sin, local)
  1313. Xregister int s;
  1314. Xregister char *host;
  1315. Xregister struct sockaddr_in *sin;
  1316. Xregister int local;
  1317. X{
  1318. X    register int i;
  1319. X    register struct queueinfo *qi;
  1320. X    register struct userinfo *ui;
  1321. X    register struct hostinfo *hi;
  1322. X    char buf[BUFSIZ];
  1323. X
  1324. X    alarm(0);
  1325. X    if(fork())
  1326. X        return;
  1327. X    for(i = 0; i < 3; i++)
  1328. X        dup2(s, i);
  1329. X    if(!getstr(s, buf))
  1330. X        exit(1);
  1331. X    if(s > 2)
  1332. X        close(s);
  1333. X    if(!strcmp(buf, "test")) {
  1334. X        printf("Debugging information:\n");
  1335. X        printf("Strays: %d, Dropouts: %d, Upstarts: %d\n",
  1336. X            strays, dropouts, upstarts);
  1337. X        printf("Deadmans: %d\n", deadmans);
  1338. X        printf("Enqueues: %d, Dequeues: %d, Showqueues: %d\n",
  1339. X                enqueues, dequeues, showqueues);
  1340. X        printf("Host\t\tAddr\t\tEquiv\tLoad\tDead\tQueued\n");
  1341. X        for(hi = hostinfo; hi; hi = hi->hi_next)
  1342. X            printf("%-16s%s\t%d\t%d\t%d\t%d\n",
  1343. X                hi->hi_name, inet_ntoa(hi->hi_addr.sin_addr),
  1344. X                hi->hi_equiv, hi->hi_load,
  1345. X                hi->hi_dead, hi->hi_queued);
  1346. X        printf("\n");
  1347. X    }
  1348. X    printf("Queue: %s\n", buf);
  1349. X    printf("Pid\t\tState\tUser\tHost\tCommand\n");
  1350. X    fflush(stdout);
  1351. X    if(!(qi = qibyname(buf)))
  1352. X        exit(1);
  1353. X    for(i = 0; i < qi->qi_hcnt; i++)
  1354. X        for(ui = qi->qi_heads[i]; ui; ui = ui->ui_next) {
  1355. X            printf("%-16ld", ui->ui_unid);
  1356. X            printf("RUN\t");
  1357. X            printf("%s\t", ui->ui_name);
  1358. X            if(hi = hibyaddr(&ui->ui_addr))
  1359. X                printf("%s\t", hi->hi_name);
  1360. X            else
  1361. X            printf("%s\t", inet_ntoa(ui->ui_addr.sin_addr.s_addr));
  1362. X            printf("%s\n", ui->ui_prog->pi_name);
  1363. X        }
  1364. X    for(ui = qi->qi_head; ui; ui = ui->ui_next) {
  1365. X        printf("%-16ld", ui->ui_unid);
  1366. X        printf("WAIT\t");
  1367. X        printf("%s\t", ui->ui_name);
  1368. X        if(hi = hibyaddr(&ui->ui_addr))
  1369. X            printf("%s\t", hi->hi_name);
  1370. X        else
  1371. X        printf("%s\t", inet_ntoa(ui->ui_addr.sin_addr.s_addr));
  1372. X        printf("%s\n", ui->ui_prog->pi_name);
  1373. X    }
  1374. X    fflush(stdout);
  1375. X    exit(0);
  1376. X}
  1377. X
  1378. Xmdequeue(s, host, sin, local)
  1379. Xregister int s;
  1380. Xregister char *host;
  1381. Xregister struct sockaddr_in *sin;
  1382. Xregister int local;
  1383. X{
  1384. X    char what[BUFSIZ], prog[BUFSIZ], user[BUFSIZ], buf[BUFSIZ];
  1385. X    long unid;
  1386. X    register struct proginfo *pi;
  1387. X    register struct queueinfo *qi;
  1388. X    register struct userinfo *ui, *pu;
  1389. X    register int i;
  1390. X
  1391. X    if(!getstr(s, user) || !getstr(s, prog) || !getstr(s, what))
  1392. X        return;
  1393. X    if(!(pi = pibyname(prog)))
  1394. X        return;
  1395. X    qi = pi->pi_queue;
  1396. X    if(strcmp(what, "all"))
  1397. X        unid = atol(what);
  1398. X    else
  1399. X        unid = -1;
  1400. X    alarm(DEADMAN);
  1401. X    for(i = 0; i < qi->qi_hcnt; i++)
  1402. X    top:    for(pu = 0, ui = qi->qi_heads[i]; ui; pu = ui, ui = ui->ui_next)
  1403. X            if(ui->ui_unid == unid ||
  1404. X                (!unid && !strcmp(ui->ui_name, what)) ||
  1405. X                unid == -1) {
  1406. X                if(strcmp(ui->ui_name, user) &&
  1407. X                    strcmp(user, "root")) {
  1408. X                    /*sprintf(buf, "Not owner (%s, %d)\n",
  1409. X                        ui->ui_name, ui->ui_unid);
  1410. X                    write(s, buf, strlen(buf));*/
  1411. X                    continue;
  1412. X                }
  1413. X                alarm(0);
  1414. X                if(pu)
  1415. X                    pu->ui_next = ui->ui_next;
  1416. X                else
  1417. X                    qi->qi_heads[i] = ui->ui_next;
  1418. X                free(ui->ui_name);
  1419. X                free(ui);
  1420. X                alarm(DEADMAN);
  1421. X                /*sprintf(buf, "Killed %d of %s for %s\n",
  1422. X                    ui->ui_unid, ui->ui_name, user);
  1423. X                write(s, buf, strlen(buf));*/
  1424. X                goto top;
  1425. X            }
  1426. X    alarm(DEADMAN);
  1427. Xtop2:    for(pu = 0, ui = qi->qi_head; ui; pu = ui, ui = ui->ui_next)
  1428. X        if(ui->ui_unid == unid ||
  1429. X            (!unid && !strcmp(ui->ui_name, what)) || unid == -1) {
  1430. X            if(strcmp(ui->ui_name, user) &&
  1431. X                strcmp(user, "root")) {
  1432. X                sprintf(buf, "Not owner (%s, %d)\n",
  1433. X                    ui->ui_name, ui->ui_unid);
  1434. X                write(s, buf, strlen(buf));
  1435. X                continue;
  1436. X            }
  1437. X            alarm(0);
  1438. X            if(pu)
  1439. X                pu->ui_next = ui->ui_next;
  1440. X            else
  1441. X                qi->qi_head = ui->ui_next;
  1442. X            *buf = 2;
  1443. X            sendto(u, buf, 1, 0, &ui->ui_addr, sizeof(ui->ui_addr));
  1444. X            free(ui->ui_name);
  1445. X            free(ui);
  1446. X            alarm(DEADMAN);
  1447. X            sprintf(buf, "Killed %d of %s for %s\n",
  1448. X                ui->ui_unid, ui->ui_name, user);
  1449. X            write(s, buf, strlen(buf));
  1450. X            goto top2;
  1451. X        }
  1452. X}
  1453. END_OF_FILE
  1454. if test 19051 -ne `wc -c <'qmaster.c'`; then
  1455.     echo shar: \"'qmaster.c'\" unpacked with wrong size!
  1456. fi
  1457. # end of 'qmaster.c'
  1458. fi
  1459. echo shar: End of archive 3 \(of 3\).
  1460. cp /dev/null ark3isdone
  1461. MISSING=""
  1462. for I in 1 2 3 ; do
  1463.     if test ! -f ark${I}isdone ; then
  1464.     MISSING="${MISSING} ${I}"
  1465.     fi
  1466. done
  1467. if test "${MISSING}" = "" ; then
  1468.     echo You have unpacked all 3 archives.
  1469.     rm -f ark[1-9]isdone
  1470. else
  1471.     echo You still need to unpack the following archives:
  1472.     echo "        " ${MISSING}
  1473. fi
  1474. ##  End of shell archive.
  1475. exit 0
  1476. exit 0 # Just in case...
  1477.